import json
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Sequence, Tuple

import gradio as gr
from gradio.components import Component  # cannot use TYPE_CHECKING here

from ..chat import ChatModel
from ..data import Role
from ..extras.misc import torch_gc
from ..hparams import GeneratingArguments
from .common import get_save_dir
from .locales import ALERTS


if TYPE_CHECKING:
    from .manager import Manager


class WebChatModel(ChatModel):
    def __init__(
        self, manager: "Manager", demo_mode: Optional[bool] = False, lazy_init: Optional[bool] = True
    ) -> None:
        self.manager = manager
        self.demo_mode = demo_mode
        self.model = None
        self.tokenizer = None
        self.generating_args = GeneratingArguments()

        if not lazy_init:  # read arguments from command line
            super().__init__()

        if demo_mode:  # load demo_config.json if exists
            import json

            try:
                with open("demo_config.json", "r", encoding="utf-8") as f:
                    args = json.load(f)
                assert args.get("model_name_or_path", None) and args.get("template", None)
                super().__init__(args)
            except AssertionError:
                print("Please provided model name and template in `demo_config.json`.")
            except Exception:
                print("Cannot find `demo_config.json` at current directory.")

    @property
    def loaded(self) -> bool:
        return self.model is not None

    def load_model(self, data: Dict[Component, Any]) -> Generator[str, None, None]:
        get = lambda name: data[self.manager.get_elem_by_name(name)]
        lang = get("top.lang")
        error = ""
        if self.loaded:
            error = ALERTS["err_exists"][lang]
        elif not get("top.model_name"):
            error = ALERTS["err_no_model"][lang]
        elif not get("top.model_path"):
            error = ALERTS["err_no_path"][lang]
        elif self.demo_mode:
            error = ALERTS["err_demo"][lang]

        if error:
            gr.Warning(error)
            yield error
            return

        if get("top.adapter_path"):
            adapter_name_or_path = ",".join(
                [
                    get_save_dir(get("top.model_name"), get("top.finetuning_type"), adapter)
                    for adapter in get("top.adapter_path")
                ]
            )
        else:
            adapter_name_or_path = None

        yield ALERTS["info_loading"][lang]
        args = dict(
            model_name_or_path=get("top.model_path"),
            adapter_name_or_path=adapter_name_or_path,
            finetuning_type=get("top.finetuning_type"),
            quantization_bit=int(get("top.quantization_bit")) if get("top.quantization_bit") in ["8", "4"] else None,
            template=get("top.template"),
            flash_attn=(get("top.booster") == "flash_attn"),
            use_unsloth=(get("top.booster") == "unsloth"),
            rope_scaling=get("top.rope_scaling") if get("top.rope_scaling") in ["linear", "dynamic"] else None,
        )
        super().__init__(args)

        yield ALERTS["info_loaded"][lang]

    def unload_model(self, data: Dict[Component, Any]) -> Generator[str, None, None]:
        lang = data[self.manager.get_elem_by_name("top.lang")]

        if self.demo_mode:
            gr.Warning(ALERTS["err_demo"][lang])
            yield ALERTS["err_demo"][lang]
            return

        yield ALERTS["info_unloading"][lang]
        self.model = None
        self.tokenizer = None
        torch_gc()
        yield ALERTS["info_unloaded"][lang]

    def predict(
        self,
        chatbot: List[Tuple[str, str]],
        role: str,
        query: str,
        messages: Sequence[Tuple[str, str]],
        system: str,
        tools: str,
        max_new_tokens: int,
        top_p: float,
        temperature: float,
    ) -> Generator[Tuple[Sequence[Tuple[str, str]], Sequence[Tuple[str, str]]], None, None]:
        chatbot.append([query, ""])
        query_messages = messages + [{"role": role, "content": query}]
        response = ""
        for new_text in self.stream_chat(
            query_messages, system, tools, max_new_tokens=max_new_tokens, top_p=top_p, temperature=temperature
        ):
            response += new_text
            if tools:
                result = self.template.format_tools.extract(response)
            else:
                result = response

            if isinstance(result, tuple):
                name, arguments = result
                arguments = json.loads(arguments)
                tool_call = json.dumps({"name": name, "arguments": arguments}, ensure_ascii=False)
                output_messages = query_messages + [{"role": Role.FUNCTION.value, "content": tool_call}]
                bot_text = "```json\n" + tool_call + "\n```"
            else:
                output_messages = query_messages + [{"role": Role.ASSISTANT.value, "content": result}]
                bot_text = result

            chatbot[-1] = [query, self.postprocess(bot_text)]
            yield chatbot, output_messages

    def postprocess(self, response: str) -> str:
        blocks = response.split("```")
        for i, block in enumerate(blocks):
            if i % 2 == 0:
                blocks[i] = block.replace("<", "&lt;").replace(">", "&gt;")
        return "```".join(blocks)
